之前分析了controller的主要功能,在分析管理network时,并未对network作出详细分析。本次将对network作出分析。

network

libnetwork代表一个网络,网络中可以放入endpoint。在bridge模式下,可以把network理解成bridge。network定义在/libnetwork/network.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type network struct {
ctrlr *controller
name string
networkType string
id string
scope string
labels map[string]string
ipamType string
ipamOptions map[string]string
addrSpace string
ipamV4Config []*IpamConf
ipamV6Config []*IpamConf
ipamV4Info []*IpamInfo
ipamV6Info []*IpamInfo
enableIPv6 bool
postIPv6 bool
epCnt *endpointCnt
generic options.Generic
dbIndex uint64
dbExists bool
persist bool
stopWatchCh chan struct{}
drvOnce *sync.Once
internal bool
inDelete bool
ingress bool
driverTables []string
dynamic bool
sync.Mutex
}

我们把network的功能分为network本身操作和endpoint管理两部分。

network本身操作

network:Scope()

Scope()返回network的scope。

1
2
3
4
5
6
7
8
9
func (n *network) DataScope() string {
return n.Scope()
}
func (n *network) Scope() string {
n.Lock()
defer n.Unlock()
return n.scope
}

controller:Delete()

Delete()可以删除本network。Delete()主要调用了delete(),delete()流程如下:

  1. 从store中获取最新的network;
  2. 调用deleteNetwork()方法删除底层的network;
  3. 释放IPAM;
  4. 从store中删除endpointCount和network。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//***删除network***//
func (n *network) Delete() error {
return n.delete(false)
}
//***删除network的实现***//
func (n *network) delete(force bool) error {
n.Lock()
c := n.ctrlr
name := n.name
id := n.id
n.Unlock()
//***从store最新获取network***//
n, err := c.getNetworkFromStore(id)
if err != nil {
return &UnknownNetworkError{name: name, id: id}
}
if !force && n.getEpCnt().EndpointCnt() != 0 {
return &ActiveEndpointsError{name: n.name, id: n.id}
}
// Mark the network for deletion
n.inDelete = true
if err = c.updateToStore(n); err != nil {
return fmt.Errorf("error marking network %s (%s) for deletion: %v", n.Name(), n.ID(), err)
}
//***调用deleteNetwork()方法***//
if err = n.deleteNetwork(); err != nil {
if !force {
return err
}
log.Debugf("driver failed to delete stale network %s (%s): %v", n.Name(), n.ID(), err)
}
//***释放IPAM***//
n.ipamRelease()
if err = c.updateToStore(n); err != nil {
log.Warnf("Failed to update store after ipam release for network %s (%s): %v", n.Name(), n.ID(), err)
}
// We are about to delete the network. Leave the gossip
// cluster for the network to stop all incoming network
// specific gossip updates before cleaning up all the service
// bindings for the network. But cleanup service binding
// before deleting the network from the store since service
// bindings cleanup requires the network in the store.
n.cancelDriverWatches()
if err = n.leaveCluster(); err != nil {
log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err)
}
c.cleanupServiceBindings(n.ID())
// deleteFromStore performs an atomic delete operation and the
// network.epCnt will help prevent any possible
// race between endpoint join and network delete
//***从store中删除endpointCount***//
if err = c.deleteFromStore(n.getEpCnt()); err != nil {
if !force {
return fmt.Errorf("error deleting network endpoint count from store: %v", err)
}
log.Debugf("Error deleting endpoint count from store for stale network %s (%s) for deletion: %v", n.Name(), n.ID(), err)
}
//***从store中删除network***//
if err = c.deleteFromStore(n); err != nil {
return fmt.Errorf("error deleting network from store: %v", err)
}
return nil
}

network::deleteNetwork()

deleteNetwork()通过调用driver的Deletenetwork()删除底层的network。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (n *network) deleteNetwork() error {
//***获取driver***//
d, err := n.driver(true)
if err != nil {
return fmt.Errorf("failed deleting network: %v", err)
}
//***调用driver的DeleteNetwork()方法***//
if err := d.DeleteNetwork(n.ID()); err != nil {
// Forbidden Errors should be honored
if _, ok := err.(types.ForbiddenError); ok {
return err
}
if _, ok := err.(types.MaskableError); !ok {
log.Warnf("driver error deleting network %s : %v", n.name, err)
}
}
return nil
}

network::driver()

driver()返回network对应的driver。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (n *network) driver(load bool) (driverapi.Driver, error) {
//***获取driver和capability***//
d, cap, err := n.resolveDriver(n.networkType, load)
if err != nil {
return nil, err
}
c := n.getController()
isAgent := c.isAgent()
n.Lock()
// If load is not required, driver, cap and err may all be nil
if cap != nil {
n.scope = cap.DataScope
}
if isAgent || n.dynamic {
// If we are running in agent mode then all networks
// in libnetwork are local scope regardless of the
// backing driver.
n.scope = datastore.LocalScope
}
n.Unlock()
return d, nil
}
func (n *network) resolveDriver(name string, load bool) (driverapi.Driver, *driverapi.Capability, error) {
//***获取network的controller***//
c := n.getController()
// Check if a driver for the specified network type is available
//***从controller的drvRegistry中获取driver***//
d, cap := c.drvRegistry.Driver(name)
if d == nil {
if load {
var err error
err = c.loadDriver(name)
if err != nil {
return nil, nil, err
}
d, cap = c.drvRegistry.Driver(name)
if d == nil {
return nil, nil, fmt.Errorf("could not resolve driver %s in registry", name)
}
} else {
// don't fail if driver loading is not required
return nil, nil, nil
}
}
return d, cap, nil
}

管理endpoint

network通过store管理endpoint。

network::EndpointByID()

EndpointByID()可以通过id获取endpoint。

1
2
3
4
5
6
7
8
9
10
11
12
func (n *network) EndpointByID(id string) (Endpoint, error) {
if id == "" {
return nil, ErrInvalidID(id)
}
ep, err := n.getEndpointFromStore(id)
if err != nil {
return nil, ErrNoSuchEndpoint(id)
}
return ep, nil
}

network::EndpointByName()

EndpointByName()依据name获取endpoint。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (n *network) EndpointByName(name string) (Endpoint, error) {
if name == "" {
return nil, ErrInvalidName(name)
}
var e Endpoint
s := func(current Endpoint) bool {
if current.Name() == name {
e = current
return true
}
return false
}
n.WalkEndpoints(s)
if e == nil {
return nil, ErrNoSuchEndpoint(name)
}
return e, nil
}

network::Endpoints()

Endpoints()可以从network中获取endpoint列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//***获取endpoint列表***//
func (n *network) Endpoints() []Endpoint {
var list []Endpoint
//***调用getEndpointsFromStore()直接从store中获取endpoints***//
endpoints, err := n.getEndpointsFromStore()
if err != nil {
log.Error(err)
}
for _, ep := range endpoints {
list = append(list, ep)
}
return list
}

network::WalkEndpoints()

WalkEndpoints()可以轮询所有endpoint,直到walker()返回true。

1
2
3
4
5
6
7
func (n *network) WalkEndpoints(walker EndpointWalker) {
for _, e := range n.Endpoints() {
if walker(e) {
return
}
}
}

network::CreateEndpoint()

CreateEndpoint()创建一对vethpair,并把vethpair的一端绑定到network中。流程如下:

  1. 验证endpoint是否存在,如果存在,则直接返回;
  2. 生成endpoint结构体值;
  3. 设置IP;
  4. 调用addEndpoint;
  5. 存储endpoint。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//***创建endpoint***//
func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoint, error) {
var err error
if !config.IsValidName(name) {
return nil, ErrInvalidName(name)
}
//***验证endpoint是否存在***//
if _, err = n.EndpointByName(name); err == nil {
return nil, types.ForbiddenErrorf("service endpoint with name %s already exists", name)
}
//***生成endpoint结构体值***//
ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}}
ep.id = stringid.GenerateRandomID()
// Initialize ep.network with a possibly stale copy of n. We need this to get network from
// store. But once we get it from store we will have the most uptodate copy possibly.
ep.network = n
ep.locator = n.getController().clusterHostID()
ep.network, err = ep.getNetworkFromStore()
if err != nil {
return nil, fmt.Errorf("failed to get network during CreateEndpoint: %v", err)
}
n = ep.network
ep.processOptions(options...)
for _, llIPNet := range ep.Iface().LinkLocalAddresses() {
if !llIPNet.IP.IsLinkLocalUnicast() {
return nil, types.BadRequestErrorf("invalid link local IP address: %v", llIPNet.IP)
}
}
if opt, ok := ep.generic[netlabel.MacAddress]; ok {
if mac, ok := opt.(net.HardwareAddr); ok {
ep.iface.mac = mac
}
}
//***获取ipam***//
ipam, cap, err := n.getController().getIPAMDriver(n.ipamType)
if err != nil {
return nil, err
}
if cap.RequiresMACAddress {
if ep.iface.mac == nil {
ep.iface.mac = netutils.GenerateRandomMAC()
}
if ep.ipamOptions == nil {
ep.ipamOptions = make(map[string]string)
}
ep.ipamOptions[netlabel.MacAddress] = ep.iface.mac.String()
}
//***生成并设置ipv4的ip***//
if err = ep.assignAddress(ipam, true, n.enableIPv6 && !n.postIPv6); err != nil {
return nil, err
}
defer func() {
if err != nil {
ep.releaseAddress()
}
}()
//***调用addEndpoint()***//
if err = n.addEndpoint(ep); err != nil {
return nil, err
}
defer func() {
if err != nil {
if e := ep.deleteEndpoint(false); e != nil {
log.Warnf("cleaning up endpoint failed %s : %v", name, e)
}
}
}()
//***生成并设置ipv6的ip***//
if err = ep.assignAddress(ipam, false, n.enableIPv6 && n.postIPv6); err != nil {
return nil, err
}
//***更新ep***//
if err = n.getController().updateToStore(ep); err != nil {
return nil, err
}
defer func() {
if err != nil {
if e := n.getController().deleteFromStore(ep); e != nil {
log.Warnf("error rolling back endpoint %s from store: %v", name, e)
}
}
}()
// Watch for service records
n.getController().watchSvcRecord(ep)
defer func() {
if err != nil {
n.getController().unWatchSvcRecord(ep)
}
}()
// Increment endpoint count to indicate completion of endpoint addition
if err = n.getEpCnt().IncEndpointCnt(); err != nil {
return nil, err
}
return ep, nil
}

network::addEndpoint()

addEndpoint()调用driver的CreateEndpoint()生成endpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (n *network) addEndpoint(ep *endpoint) error {
d, err := n.driver(true)
if err != nil {
return fmt.Errorf("failed to add endpoint: %v", err)
}
//***调用driver的CreateEndpoint(***//
err = d.CreateEndpoint(n.id, ep.id, ep.Interface(), ep.generic)
if err != nil {
return types.InternalErrorf("failed to create endpoint %s on network %s: %v",
ep.Name(), n.Name(), err)
}
return nil
}

以bridge为例,driver的CreateEndpoint()定义在/libnetwork/drivers/bridge/bridge.go中:

driver::CreateEndpoint()

CreateEndpoint()的流程如下:

  1. 获取network;
  2. 获取endpoint;
  3. 生成hostIfName, containerIfName;
  4. 创建vethpair;
  5. 把vethpair的host端添加到bridge上;
  6. 把vethpair的sandbox端的信息保存在endpoint中;
  7. 启用vethpair的host端;
  8. 更新endpoint。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
//***创建endpoint***//
func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo, epOptions map[string]interface{}) error {
defer osl.InitOSContext()()
if ifInfo == nil {
return errors.New("invalid interface info passed")
}
// Get the network handler and make sure it exists
d.Lock()
//***Fankang***//
//***获取network***//
n, ok := d.networks[nid]
dconfig := d.config
d.Unlock()
if !ok {
return types.NotFoundErrorf("network %s does not exist", nid)
}
if n == nil {
return driverapi.ErrNoNetwork(nid)
}
// Sanity check
n.Lock()
if n.id != nid {
n.Unlock()
return InvalidNetworkIDError(nid)
}
n.Unlock()
// Check if endpoint id is good and retrieve correspondent endpoint
//***获取endpoint,如果存在,则报错***//
ep, err := n.getEndpoint(eid)
if err != nil {
return err
}
// Endpoint with that id exists either on desired or other sandbox
if ep != nil {
return driverapi.ErrEndpointExists(eid)
}
// Try to convert the options to endpoint configuration
epConfig, err := parseEndpointOptions(epOptions)
if err != nil {
return err
}
// Create and add the endpoint
n.Lock()
endpoint := &bridgeEndpoint{id: eid, nid: nid, config: epConfig}
n.endpoints[eid] = endpoint
n.Unlock()
// On failure make sure to remove the endpoint
defer func() {
if err != nil {
n.Lock()
delete(n.endpoints, eid)
n.Unlock()
}
}()
// Generate a name for what will be the host side pipe interface
//***Fankang***//
//***生成hostIfName***//
hostIfName, err := netutils.GenerateIfaceName(d.nlh, vethPrefix, vethLen)
if err != nil {
return err
}
// Generate a name for what will be the sandbox side pipe interface
//***Fankang***//
//***生成containerIfName***//
containerIfName, err := netutils.GenerateIfaceName(d.nlh, vethPrefix, vethLen)
if err != nil {
return err
}
// Generate and add the interface pipe host <-> sandbox
//***Fankang***//
//***创建vethpair***//
//***hostIfName: veth9c84272 containerIfName: veth50870f7***//
veth := &netlink.Veth{
LinkAttrs: netlink.LinkAttrs{Name: hostIfName, TxQLen: 0},
PeerName: containerIfName}
if err = d.nlh.LinkAdd(veth); err != nil {
return types.InternalErrorf("failed to add the host (%s) <=> sandbox (%s) pair interfaces: %v", hostIfName, containerIfName, err)
}
// Get the host side pipe interface handler
//***Fankang***//
//***获取host端的handler***//
host, err := d.nlh.LinkByName(hostIfName)
if err != nil {
return types.InternalErrorf("failed to find host side interface %s: %v", hostIfName, err)
}
defer func() {
if err != nil {
d.nlh.LinkDel(host)
}
}()
// Get the sandbox side pipe interface handler
//***Fankang***//
//***获取sandbox端的handler***//
sbox, err := d.nlh.LinkByName(containerIfName)
if err != nil {
return types.InternalErrorf("failed to find sandbox side interface %s: %v", containerIfName, err)
}
defer func() {
if err != nil {
d.nlh.LinkDel(sbox)
}
}()
n.Lock()
config := n.config
n.Unlock()
// Add bridge inherited attributes to pipe interfaces
//***Fankang***//
//***设置link端的MTU***//
if config.Mtu != 0 {
err = d.nlh.LinkSetMTU(host, config.Mtu)
if err != nil {
return types.InternalErrorf("failed to set MTU on host interface %s: %v", hostIfName, err)
}
err = d.nlh.LinkSetMTU(sbox, config.Mtu)
if err != nil {
return types.InternalErrorf("failed to set MTU on sandbox interface %s: %v", containerIfName, err)
}
}
// Attach host side pipe interface into the bridge
//***Fankang***//
//***把vethpair的host端添加到bridge上***//
//***config.BridgeName: docker0***//
if err = addToBridge(d.nlh, hostIfName, config.BridgeName); err != nil {
return fmt.Errorf("adding interface %s to bridge %s failed: %v", hostIfName, config.BridgeName, err)
}
if !dconfig.EnableUserlandProxy {
//***Fankang***//
//***设置Hairpin***//
err = setHairpinMode(d.nlh, host, true)
if err != nil {
return err
}
}
// Store the sandbox side pipe interface parameters
//***Fankang***//
//***把sandbox端的信息保存在endpoint中***//
endpoint.srcName = containerIfName
endpoint.macAddress = ifInfo.MacAddress()
endpoint.addr = ifInfo.Address()
endpoint.addrv6 = ifInfo.AddressIPv6()
// Set the sbox's MAC if not provided. If specified, use the one configured by user, otherwise generate one based on IP.
if endpoint.macAddress == nil {
endpoint.macAddress = electMacAddress(epConfig, endpoint.addr.IP)
if err = ifInfo.SetMacAddress(endpoint.macAddress); err != nil {
return err
}
}
// Up the host interface after finishing all netlink configuration
//***Fankang***//
//***启用host端***//
if err = d.nlh.LinkSetUp(host); err != nil {
return fmt.Errorf("could not set link up for host interface %s: %v", hostIfName, err)
}
if endpoint.addrv6 == nil && config.EnableIPv6 {
var ip6 net.IP
network := n.bridge.bridgeIPv6
if config.AddressIPv6 != nil {
network = config.AddressIPv6
}
ones, _ := network.Mask.Size()
if ones > 80 {
err = types.ForbiddenErrorf("Cannot self generate an IPv6 address on network %v: At least 48 host bits are needed.", network)
return err
}
ip6 = make(net.IP, len(network.IP))
copy(ip6, network.IP)
for i, h := range endpoint.macAddress {
ip6[i+10] = h
}
endpoint.addrv6 = &net.IPNet{IP: ip6, Mask: network.Mask}
if err = ifInfo.SetIPAddress(endpoint.addrv6); err != nil {
return err
}
}
//***Fankang***//
//***更新endpoint***//
if err = d.storeUpdate(endpoint); err != nil {
return fmt.Errorf("failed to save bridge endpoint %s to store: %v", endpoint.id[0:7], err)
}
return nil
}

endpoint

最后一个小节,我们来看下endpoint的Delete()和Join()实现。

endpiont::Delete()

Delete()可以删除本endpoint。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
func (ep *endpoint) Delete(force bool) error {
var err error
n, err := ep.getNetworkFromStore()
if err != nil {
return fmt.Errorf("failed to get network during Delete: %v", err)
}
ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return fmt.Errorf("failed to get endpoint from store during Delete: %v", err)
}
ep.Lock()
epid := ep.id
name := ep.name
sbid := ep.sandboxID
ep.Unlock()
sb, _ := n.getController().SandboxByID(sbid)
if sb != nil && !force {
return &ActiveContainerError{name: name, id: epid}
}
if sb != nil {
if e := ep.sbLeave(sb.(*sandbox), force); e != nil {
log.Warnf("failed to leave sandbox for endpoint %s : %v", name, e)
}
}
//***从store中删除endpoint***//
if err = n.getController().deleteFromStore(ep); err != nil {
return err
}
defer func() {
if err != nil && !force {
ep.dbExists = false
if e := n.getController().updateToStore(ep); e != nil {
log.Warnf("failed to recreate endpoint in store %s : %v", name, e)
}
}
}()
// unwatch for service records
n.getController().unWatchSvcRecord(ep)
//***调用deleteEndpoint()***//
if err = ep.deleteEndpoint(force); err != nil && !force {
return err
}
ep.releaseAddress()
if err := n.getEpCnt().DecEndpointCnt(); err != nil {
log.Warnf("failed to decrement endpoint coint for ep %s: %v", ep.ID(), err)
}
return nil
}
func (ep *endpoint) deleteEndpoint(force bool) error {
ep.Lock()
n := ep.network
name := ep.name
epid := ep.id
ep.Unlock()
driver, err := n.driver(!force)
if err != nil {
return fmt.Errorf("failed to delete endpoint: %v", err)
}
if driver == nil {
return nil
}
//***调用driver的DeleteEndpoint()***//
if err := driver.DeleteEndpoint(n.id, epid); err != nil {
if _, ok := err.(types.ForbiddenError); ok {
return err
}
if _, ok := err.(types.MaskableError); !ok {
log.Warnf("driver error deleting endpoint %s : %v", name, err)
}
}
return nil
}

以bridge为例,driver的DeleteEndpoint()定义在/libnetwork/drivers/bridge/bridge.go中:

driver::DeleteEndpoint()

DeleteEndpoint()完成vethpair的删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//***删除endpoint***//
func (d *driver) DeleteEndpoint(nid, eid string) error {
var err error
defer osl.InitOSContext()()
// Get the network handler and make sure it exists
d.Lock()
n, ok := d.networks[nid]
d.Unlock()
if !ok {
return types.InternalMaskableErrorf("network %s does not exist", nid)
}
if n == nil {
return driverapi.ErrNoNetwork(nid)
}
// Sanity Check
n.Lock()
if n.id != nid {
n.Unlock()
return InvalidNetworkIDError(nid)
}
n.Unlock()
// Check endpoint id and if an endpoint is actually there
ep, err := n.getEndpoint(eid)
if err != nil {
return err
}
if ep == nil {
return EndpointNotFoundError(eid)
}
// Remove it
n.Lock()
delete(n.endpoints, eid)
n.Unlock()
// On failure make sure to set back ep in n.endpoints, but only
// if it hasn't been taken over already by some other thread.
defer func() {
if err != nil {
n.Lock()
if _, ok := n.endpoints[eid]; !ok {
n.endpoints[eid] = ep
}
n.Unlock()
}
}()
// Try removal of link. Discard error: it is a best effort.
// Also make sure defer does not see this error either.
//***删除vethpair***//
if link, err := d.nlh.LinkByName(ep.srcName); err == nil {
d.nlh.LinkDel(link)
}
if err := d.storeDelete(ep); err != nil {
logrus.Warnf("Failed to remove bridge endpoint %s from store: %v", ep.id[0:7], err)
}
return nil
}

endpoint::Join()

Join()主要调用了sbJoin(),sbJoin()的流程如下:

  1. 获取最新的network和endpoint;
  2. 获取driver,并调用driver的Join();
  3. 获取ip,并赋给address,并把address更新到sandbox中;
  4. 更新endpoint。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//***把endpoint加入到sbox***//
func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
if sbox == nil {
return types.BadRequestErrorf("endpoint cannot be joined by nil container")
}
sb, ok := sbox.(*sandbox)
if !ok {
return types.BadRequestErrorf("not a valid Sandbox interface")
}
sb.joinLeaveStart()
defer sb.joinLeaveEnd()
//***Fankang***//
//***调用sbJoin()***//
return ep.sbJoin(sb, options...)
}
func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
//***获取network***//
n, err := ep.getNetworkFromStore()
if err != nil {
return fmt.Errorf("failed to get network from store during join: %v", err)
}
//***获取endpoint***//
ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return fmt.Errorf("failed to get endpoint from store during join: %v", err)
}
ep.Lock()
if ep.sandboxID != "" {
ep.Unlock()
return types.ForbiddenErrorf("another container is attached to the same network endpoint")
}
ep.network = n
ep.sandboxID = sb.ID()
ep.joinInfo = &endpointJoinInfo{}
epid := ep.id
ep.Unlock()
defer func() {
if err != nil {
ep.Lock()
ep.sandboxID = ""
ep.Unlock()
}
}()
nid := n.ID()
ep.processOptions(options...)
//***获取driver***//
d, err := n.driver(true)
if err != nil {
return fmt.Errorf("failed to join endpoint: %v", err)
}
//***调用driver的Join()***//
err = d.Join(nid, epid, sb.Key(), ep, sb.Labels())
if err != nil {
return err
}
defer func() {
if err != nil {
if err := d.Leave(nid, epid); err != nil {
log.Warnf("driver leave failed while rolling back join: %v", err)
}
}
}()
// Watch for service records
if !n.getController().isAgent() {
n.getController().watchSvcRecord(ep)
}
address := ""
//***Fankang***//
//***获取ip,并赋给address***//
if ip := ep.getFirstInterfaceAddress(); ip != nil {
address = ip.String()
}
//***Fankang***//
//***更新sandbox***//
if err = sb.updateHostsFile(address); err != nil {
return err
}
if err = sb.updateDNS(n.enableIPv6); err != nil {
return err
}
//***Fankang***//
//***更新endpoint***//
if err = n.getController().updateToStore(ep); err != nil {
return err
}
// Current endpoint providing external connectivity for the sandbox
extEp := sb.getGatewayEndpoint()
sb.Lock()
heap.Push(&sb.endpoints, ep)
sb.Unlock()
defer func() {
if err != nil {
sb.removeEndpoint(ep)
}
}()
//***调用populateNetworkResources激活endpoint***//
if err = sb.populateNetworkResources(ep); err != nil {
return err
}
if e := ep.addToCluster(); e != nil {
log.Errorf("Could not update state for endpoint %s into cluster: %v", ep.Name(), e)
}
if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
return sb.setupDefaultGW()
}
moveExtConn := sb.getGatewayEndpoint() != extEp
......
return nil
}

以bridge为例,driver的Join()定义在/libnetwork/drivers/bridge/bridge.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//***把network的endpoint加入到sandbox中***//
func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo, options map[string]interface{}) error {
defer osl.InitOSContext()()
//***获取network***//
network, err := d.getNetwork(nid)
if err != nil {
return err
}
//***获取endpoint***//
//***endpoint保存着sandbox端的信息***//
endpoint, err := network.getEndpoint(eid)
if err != nil {
return err
}
if endpoint == nil {
return EndpointNotFoundError(eid)
}
endpoint.containerConfig, err = parseContainerOptions(options)
if err != nil {
return err
}
iNames := jinfo.InterfaceName()
err = iNames.SetNames(endpoint.srcName, containerVethPrefix)
if err != nil {
return err
}
err = jinfo.SetGateway(network.bridge.gatewayIPv4)
if err != nil {
return err
}
err = jinfo.SetGatewayIPv6(network.bridge.gatewayIPv6)
if err != nil {
return err
}
return nil
}

当然,driver的Join()只需要对endpoint做进一步的设置。
那么endpint是何时加入到sandbox中的呢?答案是populateNetworkResources(),在设置完endpoint后,endpoint的Join()会调用populateNetworkResources()。
populateNetworkResources()定义在/libnetwork/sandbox.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
sb.Lock()
if sb.osSbox == nil {
sb.Unlock()
return nil
}
inDelete := sb.inDelete
sb.Unlock()
ep.Lock()
joinInfo := ep.joinInfo
i := ep.iface
ep.Unlock()
if ep.needResolver() {
sb.startResolver(false)
}
if i != nil && i.srcName != "" {
var ifaceOptions []osl.IfaceOption
ifaceOptions = append(ifaceOptions, sb.osSbox.InterfaceOptions().Address(i.addr), sb.osSbox.InterfaceOptions().Routes(i.routes))
if i.addrv6 != nil && i.addrv6.IP.To16() != nil {
ifaceOptions = append(ifaceOptions, sb.osSbox.InterfaceOptions().AddressIPv6(i.addrv6))
}
if len(i.llAddrs) != 0 {
ifaceOptions = append(ifaceOptions, sb.osSbox.InterfaceOptions().LinkLocalAddresses(i.llAddrs))
}
if len(ep.virtualIP) != 0 {
vipAlias := &net.IPNet{IP: ep.virtualIP, Mask: net.CIDRMask(32, 32)}
ifaceOptions = append(ifaceOptions, sb.osSbox.InterfaceOptions().IPAliases([]*net.IPNet{vipAlias}))
}
if i.mac != nil {
ifaceOptions = append(ifaceOptions, sb.osSbox.InterfaceOptions().MacAddress(i.mac))
}
//***Fankang***//
//***调用AddInterface()***//
if err := sb.osSbox.AddInterface(i.srcName, i.dstPrefix, ifaceOptions...); err != nil {
return fmt.Errorf("failed to add interface %s to sandbox: %v", i.srcName, err)
}
}
if joinInfo != nil {
// Set up non-interface routes.
for _, r := range joinInfo.StaticRoutes {
if err := sb.osSbox.AddStaticRoute(r); err != nil {
return fmt.Errorf("failed to add static route %s: %v", r.Destination.String(), err)
}
}
}
if ep == sb.getGatewayEndpoint() {
if err := sb.updateGateway(ep); err != nil {
return err
}
}
// Make sure to add the endpoint to the populated endpoint set
// before populating loadbalancers.
sb.Lock()
sb.populatedEndpoints[ep.ID()] = struct{}{}
sb.Unlock()
// Populate load balancer only after updating all the other
// information including gateway and other routes so that
// loadbalancers are populated all the network state is in
// place in the sandbox.
sb.populateLoadbalancers(ep)
// Only update the store if we did not come here as part of
// sandbox delete. If we came here as part of delete then do
// not bother updating the store. The sandbox object will be
// deleted anyway
if !inDelete {
return sb.storeUpdate()
}
return nil
}

populateNetworkResources()通过osSbox.AddInterface()把endpoint加入到sandbox中,AddInterface()定义在/libnetwork/osl/interface_linux.go,这里就不再展开分析了。